#!/usr/bin/env python
# s3funnel - Multithreaded tool for performing operations on Amazon's S3
# Copyright (c) 2008 Andrey Petrov
#
# This module is part of s3funnel and is released under
# the MIT license: http://www.opensource.org/licenses/mit-license.php

"""
s3funnel is a multithreaded tool for performing operations on Amazon's S3.

Key Operations:
    DELETE Delete key from the bucket
    GET    Get key from the bucket
    PUT    Put file into the bucket (key corresponds to filename)

Bucket Operations:
    CREATE Create a new bucket
    DROP   Delete an existing bucket (must be empty)
    LIST   List keys in the bucket. If no bucket is given, buckets will be listed.
"""

"""
TODO:
* Use callback to do an interactive progress bar
"""

import logging
log = logging.getLogger()

# Official Python modules
import os
import sys
import time
import signal
import threading
import socket
import httplib

from glob import glob
from optparse import OptionParser

# Third party modules
import workerpool
import boto

# Local imports
from s3funnel.jobs import GetJob, PutJob, DeleteJob, list_bucket, create_bucket, drop_bucket

event_stop = threading.Event()

class S3ToolBox(object):
    def __init__(self, aws_key, aws_secret_key, bucket):
        self.conn = boto.connect_s3(aws_key, aws_secret_key)
        self.bucket = self.conn.get_bucket(bucket)

def main():
    # Parse the command line...
    usage="%prog BUCKET OPERATION [OPTIONS] [FILE]...\n" + __doc__
    parser = OptionParser(usage)
    parser.add_option("-a", "--aws_key",    dest="aws_key", type="string", help="Overrides AWS_ACCESS_KEY_ID environment variable")
    parser.add_option("-s", "--aws_secret_key", dest="aws_secret_key", type="string", help="Overrides AWS_SECRET_ACCESS_KEY environment variable")
    parser.add_option("-t", "--threads",    dest="numthreads", default=1, type="int", metavar="N", help="Number of threads to use [default: %default]")
    parser.add_option("-T", "--timeout",    dest="timeout", default=0, type="float", metavar="SECONDS", help="Socket timeout time, 0 is never [default: %default]")
    parser.add_option("--start_key",        dest="start_key", type="string", default=None, metavar="KEY", help="(`list` only) Start key for list operation")
    parser.add_option("--acl",              dest="acl", type="string", default="public-read", help="(`put` only) Set the ACL permission for each file [default: %default]")
    parser.add_option("-i", "--input",      dest="input", type="string", metavar="FILE", help="Read one file per line from a FILE manifest.")
    parser.add_option("-v", "--verbose",    dest="verbose", action="count", default=None, help="Enable verbose output. Use twice to enable debug output.")

    options, args = parser.parse_args()

    # Check input
    aws_key = os.environ.get("AWS_ACCESS_KEY_ID")
    aws_secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY")

    ## AWS
    if options.aws_key:
        aws_key = options.aws_key
    if options.aws_secret_key:
        aws_secret_key = options.aws_secret_key
    if None in [aws_key, aws_secret_key]:
        parser.error("Missing required arguments `aws_key` or `aws_secret_key`")

    ## Threads
    if options.numthreads < 1:
        parser.error("`theads` must be at least 1")

    ## Misc. options
    if options.timeout:
        try:
            socket.setdefaulttimeout(options.timeout)
        except TypeError, e:
            parser.error("`timeout` error: %s" % e.message)

    # Arguments
    if len(args) < 1:
        parser.error("BUCKET not specified")
    bucket = args[0]
    if len(args) < 2:
        # Exception for single-argument operations
        if bucket.lower() in ['list']:
            operation = bucket.lower()
            bucket = None
        else:
            parser.error("OPERATION not specified")
    else:
        operation = args[1].lower()

    # Setup factories
    def toolbox_factory():
        return S3ToolBox(aws_key, aws_secret_key, bucket)
    def worker_factory(job_queue):
        return workerpool.EquippedWorker(job_queue, toolbox_factory)

    # Setup logging
    if options.verbose > 1:
        log.setLevel(logging.DEBUG)
    elif options.verbose > 0:
        log.setLevel(logging.INFO)

    # Setup operation configuration
    config = {'acl': options.acl,
              'start_key': options.start_key,
              'aws_key': aws_key,
              'aws_secret_key': aws_secret_key,
              'bucket': bucket,
              }

    # Setup operation mapping
    job_map = {'get': GetJob,
               'put': PutJob,
               'delete': DeleteJob,
               }

    method_map = {'list': list_bucket,
                  'create': create_bucket,
                  'drop': drop_bucket,
                  }

    # Confirm that the requested operation exists
    valid_operations = job_map.keys() + method_map.keys()
    valid_operations.sort()
    if operation not in valid_operations:
        parser.error("OPERATION must be one of: %s" % ', '.join(valid_operations))

    if operation in method_map:
        try:
            # NOTE: Single-thread methods accept a toolbox_factory method instead of the resulting toolbox instance.
            #       This makes the creation of the toolbox optional.
            return method_map[operation](toolbox_factory, **config)
        except boto.exception.S3ResponseError, e:
            if e.status >= 400 and e.status < 500:
                log.error("%d: %s" % (e.status, e.reason))
            else:
                log.critical("Unexpected server error: %r" % e)
            return -1
    
    if operation not in job_map:
        parser.error("Operation `%s' not implemented yet." % operation)

    # Get data source
    input_src = None
    if options.input:
        # Get source from manifest or stdin (via -i flag)
        if options.input == '-':
            input_src = "stdin"
            options.input = sys.stdin
        try:
            data = open(glob(options.input)[0])
        except IOError, IndexError:
            log.error("%s: File not found" % options.input)
            return -1
        input_src = "`%s'" % options.input
    elif len(args) < 3:
        # Get source from stdin
        input_src = "stdin"
        data = sys.stdin
    else:
        if operation == 'put':
            # Get source from glob-expanded arguments
            data = []
            for arg in args[2:]:
                found = glob(arg)
                if not found:
                    log.error("%s: No such file." % arg)
                    continue
                data += found
        else:
            data = args[2:]
        input_src = "arguments: %s" % ', '.join(data)

    # Fire up the thread pool
    log.info("Starting pool with %d threads..." % options.numthreads)
    try:
        pool = workerpool.WorkerPool(options.numthreads, maxjobs=options.numthreads*2, worker_factory=worker_factory)
    except Exception, e:
        log.critical("Failed starting workers: %r" % e)
        return -1

    # Setup interrupt handling
    def shutdown(signum, stack):
        log.warning("Interrupted, shutting down...")
        event_stop.set()
    signal.signal(signal.SIGINT, shutdown)

    # Start feeding jobs into the workers
    log.info("Using files from %s" % input_src )
    Job = job_map[operation]
    try:
        for item in data:
            if event_stop.isSet(): break
            j = Job(item.strip(), config)
            pool.put(j)
    except IOError:
        log.warning("Aborted.")

    pool.shutdown()
    pool.join()

    return 0

if __name__ == "__main__":
    log_handler = logging.StreamHandler()
    log_handler.setFormatter(logging.Formatter('%(levelname)-8s %(message)s'))
    log.addHandler(log_handler)

    main()
